1 /* 2 Copyright: Marcelo S. N. Mancini (Hipreme|MrcSnm), 2018 - 2021 3 License: [https://creativecommons.org/licenses/by/4.0/|CC BY-4.0 License]. 4 Authors: Marcelo S. N. Mancini 5 6 Copyright Marcelo S. N. Mancini 2018 - 2021. 7 Distributed under the CC BY-4.0 License. 8 (See accompanying file LICENSE.txt or copy at 9 https://creativecommons.org/licenses/by/4.0/ 10 */ 11 module hip.concurrency.thread; 12 import hip.concurrency.mutex; 13 import hip.config.opts; 14 15 static if(HipConcurrency) 16 { 17 import core.thread; 18 import core.atomic; 19 import core.sync.semaphore:Semaphore; 20 21 class HipWorkerThread : Thread 22 { 23 private struct WorkerJob 24 { 25 string name; 26 void delegate() task; 27 void delegate(string taskName) onTaskFinish; 28 } 29 private WorkerJob[] jobsQueue; 30 private Semaphore semaphore; 31 private bool isAlive = false; 32 33 private int jobsCount; 34 private DebugMutex mutex; 35 private HipWorkerPool pool; 36 private ThreadID mainThreadID; 37 38 39 this(HipWorkerPool pool = null, ThreadID mainThreadID = ThreadID.init) 40 { 41 super(&run); 42 if(pool) 43 this.pool = pool; 44 isAlive = true; 45 semaphore = new Semaphore; 46 this.mainThreadID = mainThreadID; 47 mutex = new DebugMutex(mainThreadID); 48 } 49 /** 50 * This thread goes into an invalid state after finishing it. It should not be used anymore 51 */ 52 void finish() 53 { 54 isAlive.atomicStore = false; 55 semaphore.notify; 56 } 57 bool isIdle() 58 { 59 return atomicLoad(jobsCount) == 0; 60 } 61 /** 62 * Synchronized push on queue 63 */ 64 void pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null) 65 { 66 if(isAlive.atomicLoad) 67 { 68 mutex.lock(); 69 jobsQueue~= WorkerJob(name, task, onTaskFinish); 70 jobsCount++; 71 mutex.unlock(); 72 semaphore.notify(); 73 } 74 else 75 { 76 import hip.console.log; 77 logln("Thread is not alive to get tasks."); 78 } 79 } 80 81 void startWorking() 82 { 83 if(!isRunning) 84 start(); 85 } 86 void await(bool rethrow = true) 87 { 88 // pushTask("await", () => finish); 89 // join(rethrow); 90 } 91 92 void run() 93 { 94 while(isAlive) 95 { 96 if(!isIdle) 97 { 98 mutex.lock(); 99 WorkerJob job = jobsQueue[0]; 100 jobsQueue = jobsQueue[1..$]; 101 mutex.unlock(); 102 try 103 { 104 job.task(); 105 if(job.onTaskFinish != null) 106 job.onTaskFinish(job.name); 107 atomicFetchSub(jobsCount, 1); 108 } 109 catch(Error e) 110 { 111 onAnyException(true, job.name, e.toString()); 112 return; 113 } 114 catch(Exception e) 115 { 116 onAnyException(false, job.name, e.toString()); 117 return; 118 } 119 } 120 semaphore.wait; 121 } 122 } 123 124 private void onAnyException(bool isError, string jobName, string message) 125 { 126 isAlive = false; 127 if(pool) 128 pool.onHipThreadError(this, jobName, isError,message); 129 } 130 void dispose() 131 { 132 finish(); 133 destroy(semaphore); 134 destroy(mutex); 135 } 136 } 137 138 139 class HipWorkerPool 140 { 141 HipWorkerThread[] threads; 142 protected Semaphore awaitSemaphore; 143 protected void delegate()[] finishHandlersOnMainThread; 144 protected void delegate()[] onAllTasksFinishHandlers; 145 protected DebugMutex handlersMutex; 146 147 private struct Task 148 { 149 string name; 150 void delegate() task; 151 void delegate(string taskName) onTaskFinish = null; 152 153 void execTask() 154 { 155 task(); 156 if(onTaskFinish) 157 onTaskFinish(name); 158 } 159 } 160 private Task[] mainThreadTasks; 161 private uint awaitCount = 0; 162 private shared size_t tasksCount; 163 164 165 this(size_t poolSize) 166 { 167 threads = new HipWorkerThread[](poolSize); 168 import hip.concurrency.internal:thisThreadID; 169 auto mainId = thisThreadID; 170 handlersMutex = new DebugMutex(mainId); 171 for(size_t i = 0; i < poolSize; i++) 172 threads[i] = new HipWorkerThread(this, mainId); 173 awaitSemaphore = new Semaphore(0); 174 } 175 176 void addOnAllTasksFinished(void delegate() onAllFinished) 177 { 178 if(tasksCount == 0) 179 onAllFinished(); 180 else 181 onAllTasksFinishHandlers~= onAllFinished; 182 } 183 184 protected void onHipThreadError(HipWorkerThread worker, string jobName, bool isError, string message) 185 { 186 if(awaitCount > 0) 187 { 188 awaitSemaphore.notify(); 189 } 190 import hip.util.array; 191 import hip.console.log; 192 193 194 logln("Worker ", jobName, " failed with ", isError ? "error" : "exception", ":", message); 195 threads.remove(worker); 196 } 197 void await() 198 { 199 awaitCount = 0; 200 foreach(thread; threads) 201 { 202 if(!thread.isIdle) 203 { 204 thread.pushTask("Await", () 205 { 206 awaitSemaphore.notify; 207 }); 208 awaitCount++; 209 } 210 } 211 startWorking(); 212 while(awaitCount > 0) 213 { 214 awaitSemaphore.wait(); 215 awaitCount--; 216 } 217 } 218 /** 219 * Adds a task to the pool. If no idle worker is available, the task is executed on the main thread. 220 * Keep in mind that pushin task is not enough. You need to call `startWorking()` to make it active after pushing tasks 221 * Params: 222 * name = The name of the task. 223 * task = The task to execute. 224 * onTaskFinish = Callback to execute when the task completes. 225 * isOnFinishOnMainThread = If true, the callback is executed on the main thread. 226 * 227 * Returns: 228 * The worker thread handling the task or null if the task will be executed on main thread 229 */ 230 HipWorkerThread pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null, bool isOnFinishOnMainThread = false) 231 { 232 atomicFetchAdd(tasksCount, 1); 233 foreach(i, thread; threads) 234 { 235 if(thread.isIdle) 236 { 237 import hip.console.log; 238 logln("Thread [", i, "] handling task ", name); 239 if(onTaskFinish !is null && isOnFinishOnMainThread) 240 thread.pushTask(name, task, notifyOnFinishOnMainThread(onTaskFinish)); 241 else 242 thread.pushTask(name, task, notifyOnFinish(onTaskFinish)); 243 return thread; 244 } 245 } 246 handlersMutex.lock(); 247 scope(exit) handlersMutex.unlock(); 248 //Execute a main thread task if it had anything. 249 mainThreadTasks~= Task(name, task, notifyOnFinish(onTaskFinish)); 250 return null; 251 } 252 253 static if(!HIP_ASSETMANAGER_PARTIAL_LOAD) 254 { 255 protected void executeMainThreadTasks() 256 { 257 handlersMutex.lock(); 258 Task[] tasks; 259 if(mainThreadTasks.length != 0) 260 { 261 tasks = mainThreadTasks.dup; 262 mainThreadTasks.length = 0; 263 } 264 handlersMutex.unlock(); 265 foreach(t; tasks) 266 t.execTask(); 267 } 268 } 269 else 270 { 271 protected void executeMainThreadTasks() 272 { 273 handlersMutex.lock(); 274 scope(exit) handlersMutex.unlock(); 275 import hip.util.time; 276 long timeNow = HipTime.getCurrentTimeAsMsLong(); 277 278 size_t executed; 279 foreach(t; mainThreadTasks) 280 { 281 t.execTask(); 282 executed++; 283 if(HipTime.getCurrentTimeAsMsLong() - timeNow > HIP_ASSETMANAGER_MAX_PROCESS_MS) 284 break; 285 } 286 mainThreadTasks = mainThreadTasks[executed..$]; 287 } 288 } 289 290 291 /** 292 * This function should be called every time you push a task. 293 */ 294 void startWorking() 295 { 296 foreach(thread; threads) 297 if(!thread.isIdle) 298 thread.startWorking(); 299 executeMainThreadTasks(); 300 } 301 302 void delegate(string name) notifyOnFinish(void delegate(string taskName) onFinish = null) 303 { 304 return (name) 305 { 306 if(onFinish) 307 onFinish(name); 308 atomicFetchSub(tasksCount, 1); 309 }; 310 } 311 312 void delegate(string name) notifyOnFinishOnMainThread(void delegate(string taskName) onFinish, bool finished = true) 313 { 314 return (name) 315 { 316 handlersMutex.lock(); 317 finishHandlersOnMainThread~= () 318 { 319 onFinish(name); 320 if(finished) 321 atomicFetchSub(tasksCount, 1); 322 }; 323 handlersMutex.unlock(); 324 }; 325 } 326 int getTasksCount() 327 { 328 return cast(int)atomicLoad(tasksCount); 329 } 330 331 bool isIdle() 332 { 333 return atomicLoad(tasksCount) == 0; 334 } 335 336 void pollFinished() 337 { 338 handlersMutex.lock(); 339 if(finishHandlersOnMainThread.length) 340 { 341 foreach(finishHandler; finishHandlersOnMainThread) 342 finishHandler(); 343 finishHandlersOnMainThread.length = 0; 344 } 345 if(tasksCount == 0 && onAllTasksFinishHandlers.length) 346 { 347 foreach(onAllFinish; onAllTasksFinishHandlers) 348 onAllFinish(); 349 onAllTasksFinishHandlers.length = 0; 350 } 351 handlersMutex.unlock(); 352 353 } 354 355 void dispose() 356 { 357 foreach(thread; threads) 358 thread.dispose(); 359 destroy(threads); 360 destroy(awaitSemaphore); 361 destroy(handlersMutex); 362 } 363 } 364 365 } 366 else 367 { 368 369 class HipWorkerPool 370 { 371 private HipWorkerThread thread; 372 protected void delegate()[] onAllTasksFinishHandlers; 373 private void delegate()[] finishHandlersOnMainThread; 374 size_t tasksCount = 0; 375 void addOnAllTasksFinished(void delegate() onAllFinished) 376 { 377 if(tasksCount == 0) 378 onAllFinished(); 379 else 380 onAllTasksFinishHandlers~= onAllFinished; 381 } 382 383 this(size_t poolSize) 384 { 385 thread = new HipWorkerThread(this, ulong.max); 386 } 387 void delegate(string name) notifyOnFinishOnMainThread(void delegate(string taskName) onFinish, bool finished = true) 388 { 389 return (name) 390 { 391 finishHandlersOnMainThread~= () 392 { 393 onFinish(name); 394 if(finished) 395 tasksCount--; 396 }; 397 }; 398 } 399 400 void delegate(string name) notifyOnFinish(void delegate(string taskName) onFinish) 401 { 402 return (name) 403 { 404 if(onFinish) onFinish(name); 405 version(WebAssembly){} 406 else 407 tasksCount--; 408 }; 409 } 410 final void signalTaskFinish() 411 { 412 assert(tasksCount > 0, "Tried to signal task finish without tasks."); 413 tasksCount--; 414 } 415 final void await() 416 { 417 version(WebAssembly) assert(false, "Code using await does not work on WebAssembly."); 418 } 419 final void pollFinished() 420 { 421 if(finishHandlersOnMainThread.length) 422 { 423 foreach(handler; finishHandlersOnMainThread) 424 handler(); 425 finishHandlersOnMainThread.length = 0; 426 } 427 if(tasksCount == 0 && onAllTasksFinishHandlers.length) 428 { 429 foreach(onAllFinish; onAllTasksFinishHandlers) 430 onAllFinish(); 431 onAllTasksFinishHandlers.length = 0; 432 } 433 } 434 int getTasksCount() 435 { 436 return cast(int)tasksCount; 437 } 438 439 final HipWorkerThread pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null, bool isOnFinishOnMainThread = true) 440 { 441 tasksCount++; 442 version(WebAssembly) 443 assert(onTaskFinish is null, "Can't have an onTaskFinish on Wasm, implement it on a higher level using notfyOnFinish."); 444 thread.pushTask(name, task, notifyOnFinish(onTaskFinish)); 445 return thread; 446 } 447 final void startWorking(){thread.startWorking();} 448 final void finish(){} 449 final bool isIdle(){return thread.isIdle;} 450 final void dispose(){} 451 } 452 class HipWorkerThread 453 { 454 struct WorkerTask 455 { 456 void delegate() task; 457 void delegate(string taskName) onTaskFinish; 458 string name; 459 } 460 WorkerTask[] tasks; 461 462 this(HipWorkerPool pool, ulong id){} 463 final void pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null) 464 { 465 tasks~= WorkerTask(task, onTaskFinish, name); 466 } 467 468 final void startWorking() 469 { 470 import hip.util.time; 471 long timeNow = HipTime.getCurrentTimeAsMsLong(); 472 size_t executed; 473 foreach(task; tasks) 474 { 475 task.task(); 476 if(task.onTaskFinish) 477 task.onTaskFinish(task.name); 478 executed++; 479 static if(HIP_ASSETMANAGER_PARTIAL_LOAD) 480 { 481 if(HipTime.getCurrentTimeAsMsLong() - timeNow > HIP_ASSETMANAGER_MAX_PROCESS_MS) 482 break; 483 } 484 } 485 tasks = tasks[executed..$]; 486 } 487 488 bool isIdle() 489 { 490 assert(false, "HipWorkerThread is not reliable to use isIdle since on WASM it returns immediately most functions since they are processed on background."); 491 } 492 } 493 }